package kafka_go

import (
	"errors"
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"strings"
)

/**
生产者构建
*/
type factoryProduct struct {
	isAsync       bool //是否是异步处理
	productBuilds []BuildProductApi
	pool          *productClientPool
}

/**
初始化生产者
创建同步生产者
*/
func NewFactoryProduct() *factoryProduct {
	return &factoryProduct{
		isAsync: false,
		pool:    newProductClientPool(),
	}
}

/**
注册生产者
*/
func (this *factoryProduct) Register(productBuild ...BuildProductApi) *factoryProduct {
	if this.productBuilds == nil {
		this.productBuilds = make([]BuildProductApi, 0)
	}
	if productBuild == nil || len(productBuild) == 0 {
		return this
	}
	buildLen := len(productBuild)
	this.productBuilds = make([]BuildProductApi, buildLen)
	copy(this.productBuilds, productBuild)
	return this
}

/**
执行连接
*/
func (this *factoryProduct) Connect() error {
	return this._runConnectTask()
}

/**
发送消息
*/
func (this *factoryProduct) Push(buildName string, topic string, content string) (int32, int64, error) {
	clientIndex, client, err := this.pool.GetClient(buildName)
	if err != nil {
		return 0, 0, err
	}
	msg := this.formatMessage(topic, content)
	if client.IsTransactional() {
		return this._pushTxn(clientIndex, client, msg)
	}
	return this._push(clientIndex, client, msg)
}

func (this *factoryProduct) _pushTxn(clientIndex int32, client sarama.SyncProducer, msg *sarama.ProducerMessage) (int32, int64, error) {
	err := client.BeginTxn()
	if err != nil {
		return 0, 0, err
	}

	if client.TxnStatus()&sarama.ProducerTxnFlagInTransaction == 0 {
		return 0, 0, errors.New("transaction must started")
	}
	partition, offset, err := this._send(clientIndex, client, msg)
	if err != nil {
		errs := client.AbortTxn()
		if errs != nil {
			return 0, 0, errs
		}
		return 0, 0, err
	}
	errd := client.CommitTxn()
	if errd != nil {
		return 0, 0, errd
	}
	return partition, offset, nil
}

func (this *factoryProduct) _push(clientIndex int32, client sarama.SyncProducer, msg *sarama.ProducerMessage) (int32, int64, error) {
	return this._send(clientIndex, client, msg)
}

func (this *factoryProduct) _send(clientIndex int32, client sarama.SyncProducer, msg *sarama.ProducerMessage) (int32, int64, error) {
	if client != nil {
		partition, offset, err := client.SendMessage(msg)
		//log.DebugNoFileInfo("send sync clientIndex:%d, partition:%d, offset:%d", clientIndex, partition, offset)
		return partition, offset, err
	} else {
		return 0, 0, KAFKA_PRODUCT_ERROR_BUILD_CLIENT_NIL
	}
}

func (this *factoryProduct) formatMessage(topic string, content string) *sarama.ProducerMessage {
	topic = strings.TrimSpace(topic)
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic
	msg.Value = sarama.StringEncoder(content)
	return msg
}

//创建连接
func (this *factoryProduct) _runConnectTask() error {
	//创建同步请求
	return this._createSync()
}

/**
创建同步请求客户端
*/
func (this *factoryProduct) _createSync() error {
	if this.productBuilds == nil || len(this.productBuilds) == 0 {
		return KAFKA_PRODUCT_ERROR_BUILD_EMPTY
	}
	for _, v := range this.productBuilds {
		_v := v
		err := this.pool.addClient(_v)
		if err != nil {
			return err
		}
	}
	return nil
}

/**
创建异步
*/
func (this *factoryProduct) _createAsync() {

}
